package consumer

import (
	"context"
	"io"
	"time"

	"gitlab.com/go-course-project/go15/devcloud-mini/maudit/apps/event"
)

// 处理事件
func (i *impl) HandleEvent(ctx context.Context) {
	i.log.Info().Msg("start handle event ....")

	for {
		// 手动获取消息
		m, err := i.consumer.FetchMessage(ctx)
		if err != nil {
			if err == io.EOF {
				i.log.Info().Msg("reader closed")
				return
			}
			i.log.Error().Msgf("featch message error, %s", err)
			time.Sleep(5 * time.Second)
			continue
		}

		// 处理消息
		e := event.NewEvent()
		i.log.Debug().Msgf("message at topic/partition/offset %v/%v/%v", m.Topic, m.Partition, m.Offset)

		// 发送的数据时Json格式
		err = e.Load(m.Value)
		if err == nil {
			if err := i.svc.SaveEvent(ctx, event.NewEventSet().Add(e)); err != nil {
				i.log.Error().Msgf("save event error, %s", err)
			}
		}

		// 处理完消息后需要提交该消息已经消费完成, 消费者挂掉后保存消息消费的状态
		if err := i.consumer.CommitMessages(ctx, m); err != nil {
			i.log.Error().Msgf("failed to commit messages: %s", err)
		}
	}

}
